/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.phoenix.job;

import static org.apache.phoenix.query.QueryServicesOptions.UNLIMITED_QUEUE_SIZE;

import java.util.AbstractQueue;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.ListIterator;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.phoenix.util.EnvironmentEdgeManager;

/**
 * An bounded blocking queue implementation that keeps a virtual queue of elements on per-producer
 * basis and iterates through each producer queue in round robin fashion.
 */
public abstract class AbstractRoundRobinQueue<E> extends AbstractQueue<E>
  implements BlockingQueue<E> {

  /**
   * Construct an AbstractBlockingRoundRobinQueue that limits the size of the queued elements to at
   * most maxSize. Attempts to insert new elements after that point will cause the caller to block.
   */
  public AbstractRoundRobinQueue(int maxSize) {
    this(maxSize, false);
  }

  /**
   * @param newProducerToFront If true, new producers go to the front of the round-robin list, if
   *                           false, they go to the end.
   */
  public AbstractRoundRobinQueue(int maxSize, boolean newProducerToFront) {
    this.producerMap = new HashMap<Object, ProducerList<E>>();
    this.producerLists = new LinkedList<ProducerList<E>>();
    this.lock = new Object();
    this.newProducerToFront = newProducerToFront;
    this.maxSize = maxSize;
  }

  @Override
  public Iterator<E> iterator() {
    synchronized (lock) {
      ArrayList<E> allElements = new ArrayList<E>(this.size);
      ListIterator<ProducerList<E>> iter = this.producerLists.listIterator(this.currentProducer);
      while (iter.hasNext()) {
        ProducerList<E> tList = iter.next();
        allElements.addAll(tList.list);
      }
      return allElements.iterator();
    }
  }

  @Override
  public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException {
    boolean taken = false;
    long endAt = EnvironmentEdgeManager.currentTimeMillis() + unit.toMillis(timeout);
    synchronized (lock) {
      long waitTime = endAt - EnvironmentEdgeManager.currentTimeMillis();
      while (!(taken = offer(o)) && waitTime > 0) {
        this.lock.wait(waitTime);
        waitTime = endAt - EnvironmentEdgeManager.currentTimeMillis();
      }
    }
    return taken;
  }

  @Override
  public boolean offer(E o) {
    if (o == null) throw new NullPointerException();

    final Object producerKey = extractProducer(o);

    ProducerList<E> producerList = null;
    synchronized (lock) {
      if (this.maxSize != UNLIMITED_QUEUE_SIZE && this.size == this.maxSize) {
        return false;
      }
      producerList = this.producerMap.get(producerKey);
      if (producerList == null) {
        producerList = new ProducerList<E>(producerKey);
        this.producerMap.put(producerKey, producerList);
        this.producerLists.add(this.currentProducer, producerList);
        if (!this.newProducerToFront) {
          incrementCurrentProducerPointer();
        }
      }
      producerList.list.add(o);
      this.size++;
      lock.notifyAll();
    }
    return true;
  }

  /**
   * Implementations must extracts the producer object which is used as the key to identify a unique
   * producer.
   */
  protected abstract Object extractProducer(E o);

  @Override
  public void put(E o) {
    offer(o);
  }

  @Override
  public E take() throws InterruptedException {
    synchronized (lock) {
      while (this.size == 0) {
        this.lock.wait();
      }
      E element = poll();
      assert element != null;
      return element;
    }
  }

  @Override
  public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    long endAt = EnvironmentEdgeManager.currentTimeMillis() + unit.toMillis(timeout);
    synchronized (lock) {
      long waitTime = endAt - EnvironmentEdgeManager.currentTimeMillis();
      while (this.size == 0 && waitTime > 0) {
        this.lock.wait(waitTime);
        waitTime = endAt - EnvironmentEdgeManager.currentTimeMillis();
      }
      return poll();
    }
  }

  @Override
  public E poll() {
    synchronized (lock) {
      ListIterator<ProducerList<E>> iter = this.producerLists.listIterator(this.currentProducer);
      while (iter.hasNext()) {
        ProducerList<E> tList = iter.next();
        if (tList.list.isEmpty()) {
          iter.remove();
          this.producerMap.remove(tList.producer);
          adjustCurrentProducerPointer();
        } else {
          E element = tList.list.removeFirst();
          this.size--;
          assert element != null;
          // This is the round robin part. When we take an element from the current thread's queue
          // we move on to the next thread.
          if (tList.list.isEmpty()) {
            iter.remove();
            this.producerMap.remove(tList.producer);
            adjustCurrentProducerPointer();
          } else {
            incrementCurrentProducerPointer();
          }
          lock.notifyAll();
          return element;
        }
      }
      assert this.size == 0;
    }
    return null;
  }

  /**
   * Polls using the given producer key.
   */
  protected E pollProducer(Object producer) {
    synchronized (lock) {
      ProducerList<E> tList = this.producerMap.get(producer);
      if (tList != null && !tList.list.isEmpty()) {
        E element = tList.list.removeFirst();
        this.size--;
        if (tList.list.isEmpty()) {
          this.producerLists.remove(tList);
          this.producerMap.remove(tList.producer);
          // we need to adjust the current thread pointer in case it pointed to this thread list,
          // which is now removed
          adjustCurrentProducerPointer();
        }
        lock.notifyAll();
        assert element != null;
        // Since this is only processing the current thread's work, we'll leave the
        // round-robin part alone and just return the work
        return element;
      }
    }
    return null;
  }

  @Override
  public E peek() {
    synchronized (lock) {
      ListIterator<ProducerList<E>> iter = this.producerLists.listIterator(this.currentProducer);
      while (iter.hasNext()) {
        ProducerList<E> tList = iter.next();
        if (tList.list.isEmpty()) {
          iter.remove();
          this.producerMap.remove(tList.producer);
          adjustCurrentProducerPointer();
        } else {
          E element = tList.list.getFirst();
          assert element != null;
          return element;
        }
      }
      assert this.size == 0;
    }
    return null;
  }

  @Override
  public int drainTo(Collection<? super E> c) {
    if (c == null) throw new NullPointerException();
    if (c == this) throw new IllegalArgumentException();

    synchronized (this.lock) {
      int originalSize = this.size;
      int drained = drainTo(c, this.size);
      assert drained == originalSize;
      assert this.size == 0;
      assert this.producerLists.isEmpty();
      assert this.producerMap.isEmpty();
      return drained;
    }
  }

  @Override
  public int drainTo(Collection<? super E> c, int maxElements) {
    if (c == null) throw new NullPointerException();
    if (c == this) throw new IllegalArgumentException();

    synchronized (this.lock) {
      int i = 0;
      while (i < maxElements) {
        E element = poll();
        if (element != null) {
          c.add(element);
          i++;
        } else {
          break;
        }
      }
      return i;
    }
  }

  @Override
  public int remainingCapacity() {
    return Integer.MAX_VALUE;
  }

  @Override
  public int size() {
    synchronized (this.lock) {
      return this.size;
    }
  }

  private void incrementCurrentProducerPointer() {
    synchronized (lock) {
      if (this.producerLists.size() == 0) {
        this.currentProducer = 0;
      } else {
        this.currentProducer = (this.currentProducer + 1) % this.producerLists.size();
      }
    }
  }

  /**
   * Adjusts the current pointer to a decrease in size.
   */
  private void adjustCurrentProducerPointer() {
    synchronized (lock) {
      if (this.producerLists.size() == 0) {
        this.currentProducer = 0;
      } else {
        this.currentProducer = (this.currentProducer) % this.producerLists.size();
      }
    }
  }

  private static class ProducerList<E> {
    public ProducerList(Object producer) {
      this.producer = producer;
      this.list = new LinkedList<E>();
    }

    private final Object producer;
    private final LinkedList<E> list;
  }

  private final Map<Object, ProducerList<E>> producerMap;
  private final LinkedList<ProducerList<E>> producerLists;
  private final Object lock;
  private final boolean newProducerToFront;
  private int currentProducer;
  private int size;
  private int maxSize;
}
